package main

import (
	"context"
	"dash/importer"
	"dash/versions"
	"database/sql"
	"flag"
	"fmt"
	"github.com/Shopify/sarama"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/log/level"
	_ "github.com/go-sql-driver/mysql"
	"github.com/olivere/elastic/v7"
	"gopkg.in/yaml.v3"
	"io/ioutil"
	"os"
)

var cfg struct {
	ElasticDSN string `yaml:"elasticDSN"`
	MysqlDSN   string `yaml:"mysqlDSN"`
	KafkaDSN   string `yaml:"kafkaDSN"`

	ConsumerGroupId string `yaml:"consumerGroupId"`

	TextMessageTopic            string `yaml:"textMessageTopic"`
	OtherMessageTopic           string `yaml:"otherMessageTopic"`
	CollectorJoinChatroomTopic  string `yaml:"collectorJoinChatroomTopic"`
	CollectorLeaveChatroomTopic string `yaml:"collectorLeaveChatroomTopic"`
	MemberJoinChatroomTopic     string `yaml:"memberJoinChatroomTopic"`
	MemberLeaveChatroomTopic    string `yaml:"memberLeaveChatroomTopic"`
	ChatroomMemberTopic         string `yaml:"chatroomMemberTopic"`
	LogTopic                    string `yaml:"logTopic"`
	HeartBeatTopic              string `yaml:"heartBeatTopic"`

	MessageIndex string `yaml:"messageIndex"`
}

func init() {
	b, err := ioutil.ReadFile("config.yaml")
	if err != nil {
		panic(err)
	}
	if err := yaml.Unmarshal(b, &cfg); err != nil {
		panic(err)
	}
}

var printVersion = flag.Bool("v", false, "version")

func main() {
	flag.Parse()
	if *printVersion {
		fmt.Println(versions.Version)
		os.Exit(0)
	}

	var logger log.Logger
	{
		logger = log.NewJSONLogger(log.NewSyncWriter(os.Stderr))
		logger = log.With(logger, "svc", "importer")
		logger = log.With(logger, "ts", log.DefaultTimestampUTC)
		logger = log.With(logger, "caller", log.DefaultCaller)
	}

	var esClient *elastic.Client
	{
		var err error
		esClient, err = elastic.NewSimpleClient(elastic.SetURL(cfg.ElasticDSN))
		if err != nil {
			_ = level.Error(logger).Log("error", err)
			os.Exit(1)
		}
	}

	var mysqlDB *sql.DB
	{
		db, err := sql.Open("mysql", cfg.MysqlDSN)
		if err != nil {
			_ = level.Error(logger).Log("error", err.Error())
			os.Exit(1)
		}
		mysqlDB = db
	}

	repo := importer.NewRepository(logger, esClient, mysqlDB, cfg.MessageIndex)
	ch := importer.NewConsumerGroupHandler(logger)
	{
		messageHandler := importer.NewMessageTopicHandler(logger, repo)
		ch.AddTopicHandler(cfg.TextMessageTopic, messageHandler)
		ch.AddTopicHandler(cfg.OtherMessageTopic, messageHandler)
		ch.AddTopicHandler(cfg.CollectorJoinChatroomTopic, importer.NewCollectorJoinChatroomTopicHandler(logger, repo))
		ch.AddTopicHandler(cfg.CollectorLeaveChatroomTopic, importer.NewCollectorLeaveChatroomTopicHandler(logger, repo))
		ch.AddTopicHandler(cfg.MemberJoinChatroomTopic, importer.NewMemberJoinChatroomTopicHandler(logger, repo))
		ch.AddTopicHandler(cfg.MemberLeaveChatroomTopic, importer.NewMemberLeaveChatroomTopicHandler(logger, repo))
		ch.AddTopicHandler(cfg.ChatroomMemberTopic, importer.NewChatroomMemberTopicHandler(logger, repo))
		ch.AddTopicHandler(cfg.HeartBeatTopic, importer.NewHeartbeatTopicHandler(logger))
	}

	var kafkaConfig *sarama.Config
	{
		kafkaConfig = sarama.NewConfig()
		kafkaConfig.Version = sarama.V2_4_0_0
		kafkaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
		kafkaConfig.Consumer.Return.Errors = true
	}

	var consumerGroup sarama.ConsumerGroup
	{
		cg, err := sarama.NewConsumerGroup([]string{cfg.KafkaDSN}, cfg.ConsumerGroupId, kafkaConfig)
		if err != nil {
			_ = logger.Log("err", err.Error())
			os.Exit(1)
		}
		consumerGroup = cg
	}
	defer func() {
		err := consumerGroup.Close()
		if err != nil {
			_ = logger.Log("err", err.Error())
		}
	}()
	go func() {
		for err := range consumerGroup.Errors() {
			_ = logger.Log("err", err.Error())
		}
	}()
	topics := []string{
		cfg.TextMessageTopic,
		cfg.OtherMessageTopic,
		cfg.CollectorJoinChatroomTopic,
		cfg.CollectorLeaveChatroomTopic,
		cfg.MemberJoinChatroomTopic,
		cfg.MemberLeaveChatroomTopic,
		cfg.ChatroomMemberTopic,
	}
	for {
		err := consumerGroup.Consume(context.Background(), topics, ch)
		if err != nil {
			_ = logger.Log("err", err.Error())
			os.Exit(1)
		}
		_ = logger.Log("err", "consumer group handler return")
	}
}
